#!/usr/bin/env python3
# Copyright 2019-2022 Alibaba Group Holding Limited.
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause

import subprocess
import sh
import os
import sys
import time
import psutil
from glob import glob
import logging
import colorlog

handler = logging.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
    '%(cyan)s%(asctime)s%(reset)s %(log_color)s%(levelname)s %(white)s%(message)s%(reset)s',
    datefmt='%Y-%m-%d %H:%M:%S'))
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(handler)

class TestCPUThrottle:
    def setup_class(self):
        print("CPU throttle test")
        self.cgpath = '/sys/fs/cgroup/cpu/test'
        sh.tee(sh.echo(1), '/sys/fs/cgroup/cpu/cgroup.clone_children')
        sh.mkdir(self.cgpath)

    def init_cgroup(self):
        self.child = sh.bash('-c', 'while :; do :; done', _bg=True, bg_exc=False)
        sh.tee(sh.echo(self.child.pid), '%s/cgroup.procs' % self.cgpath)

    def set_cfs_quota(self, t_us):
        sh.echo(t_us, _out='%s/cpu.cfs_quota_us' % self.cgpath)

    def test_all(self):
        self.install_module()
        self.init_cgroup()
        self.set_cfs_quota('50000')
        self.check_le_75()
        self.check_after_load()
        self.set_cfs_quota('100000')
        self.check_gt_75()
        self.check_after_unload()

    def check_le_75(self):
        cpu_util = self.get_cpu_util(self.child.pid)
        self.validate_lt(cpu_util, 75)

    def install_module(self):
        scheduler_rpm = glob(os.path.join('/tmp/work', 'scheduler*.rpm'))
        if len(scheduler_rpm) != 1:
            print("Please check your scheduler rpm");
            self.teardown_class()
            sys.exit(1)
        scheduler_rpm = scheduler_rpm[0]
        sh.rpm('-ivh', scheduler_rpm)
        sh.tee(sh.echo(0), '/sys/kernel/plugsched/plugsched/enable')

    def uninstall_module(self):
        sh.rpm('-e', 'scheduler-xxx')

    def check_after_load(self):
        sh.tee(sh.echo(1), '/sys/kernel/plugsched/plugsched/enable')
        cpu_util = self.get_cpu_util(self.child.pid)
        self.validate_lt(cpu_util, 75)

    def check_gt_75(self):
        cpu_util = self.get_cpu_util(self.child.pid)
        self.validate_gt(cpu_util, 75)

    def check_after_unload(self):
        sh.tee(sh.echo(0), '/sys/kernel/plugsched/plugsched/enable')
        cpu_util = self.get_cpu_util(self.child.pid)
        self.validate_gt(cpu_util, 75)

    def get_cpu_util(self, pid):
        return psutil.Process(pid).cpu_percent(interval=2)

    def teardown_class(self):
        try:
            self.child.kill()
            self.child.wait()
        except sh.SignalException_SIGKILL:
            pass
        sh.rmdir(self.cgpath)
        if sh.grep(sh.lsmod(), 'scheduler', _ok_code=[0,1]).exit_code == 0:
            self.uninstall_module()

    def validate_lt(self, util, bound):
        if util >= bound:
            self.error_handler('less', util, bound)

    def validate_gt(self, util, bound):
        if util <= bound:
            self.error_handler('greater', util, bound)

    def error_handler(self, expect, util, bound):
        err_msg = 'CPU util is {} but should be {} than {}'.format(util, expect, bound)
        print(err_msg)
        self.teardown_class()
        raise


if __name__ == '__main__':
    test_unit = TestCPUThrottle()
    test_unit.setup_class()
    test_unit.test_all()
    test_unit.teardown_class()
